package com.dbflow5.reactivestreams.transaction;

import com.dbflow5.database.DatabaseWrapper;
import com.dbflow5.query.ModelQueriable;
import com.dbflow5.reactivestreams.query.TableChangeOnSubscribe;
import com.dbflow5.transaction.Transaction;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.MaybeObserver;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.disposables.Disposable;

import java.util.function.BiFunction;

public class TransactionObservable {

    public static <R> MaybeTransaction<R> asMaybe(Transaction.Builder<R> builder) {
        return new MaybeTransaction<>(builder);
    }

    public static <R> SingleTransaction<R> asSingle(Transaction.Builder<R> builder) {
        return new SingleTransaction<>(builder);
    }

    public static <T, R> Flowable<R> asFlowable(ModelQueriable<T> modelQueriable, BiFunction<ModelQueriable<T>, DatabaseWrapper, R> evalFn) {
        return Flowable.create(new TableChangeOnSubscribe<>(modelQueriable, evalFn), BackpressureStrategy.LATEST);
    }

    public static class TransactionDisposable implements Disposable {
        private final Transaction<?> transaction;
        private boolean disposed = false;

        public TransactionDisposable(Transaction<?> transaction) {
            this.transaction = transaction;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }

        @Override
        public void dispose() {
            transaction.cancel();
            disposed = true;
        }
    }

    public static class SingleTransaction<R> extends Single<R> {
        private final Transaction.Builder<R> builder;

        public SingleTransaction(Transaction.Builder<R> builder) {
            super();
            this.builder = builder;
        }

        public Transaction<R> transaction = null;

        @Override
        public void subscribeActual(SingleObserver<? super R> observer) {
            Transaction<R> transaction = builder.success((rTransaction, r) -> {
                observer.onSuccess(r);
                return null;
            }).error((rTransaction, throwable) -> {
                observer.onError(throwable);
                return null;
            }).completion(rTransaction -> {
                this.transaction = null;
                return null;
            }).build();

            observer.onSubscribe(new TransactionDisposable(transaction));
            this.transaction = transaction;
            transaction.execute();
        }
    }

    public static class MaybeTransaction<R> extends Maybe<R> {
        private final Transaction.Builder<R> builder;

        public MaybeTransaction(Transaction.Builder<R> builder) {
            super();
            this.builder = builder;
        }

        public Transaction<R> transaction = null;

        @Override
        public void subscribeActual(MaybeObserver<? super R> observer) {
            Transaction<R> transaction = builder.success((rTransaction, r) -> {
                observer.onSuccess(r);
                return null;
            }).completion(rTransaction -> {
                this.transaction = null;
                observer.onComplete();
                return null;
            }).error((rTransaction, throwable) -> {
                observer.onError(throwable);
                return null;
            }).build();

            observer.onSubscribe(new TransactionDisposable(transaction));
            this.transaction = transaction;
            transaction.execute();
        }
    }
}